package com.mimo.logic.room.service.impl;

import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import javax.annotation.PreDestroy;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.mimo.common.cache.ICache;
import com.mimo.logic.room.service.IRoomService;

/**
 * 代表房间及其成员的本地缓存。
 * <p>
 * 主要是用于提供给 Message 模块向房间中的人员进行消息广播
 * 
 * @author Hongyu
 */
public class RoomCache implements ICache<String, String[], String> {

  private static final Logger log = LoggerFactory.getLogger(RoomCache.class);

  private static final String[] EMPTY_ARRAYS = {};

  private static final int MAX_ENTRY_EXPIRE = 5;// 5分钟

  private static final int DEFAULT_INITIAL_CAPACITY = 1024;

  private static final long MAX_EXTRY_SIZE = 2048; // 本地缓存的数据容量

  private final LoadingCache<String, Set<String>> localCache;

  private final Map<String, ReadWriteLock> locks;

  private final IRoomService source;

  private final ScheduledExecutorService scheduledThreadPool;

  public RoomCache(IRoomService src) {
    Objects.nonNull(src);
    this.source = src;
    this.locks = new ConcurrentHashMap<>(DEFAULT_INITIAL_CAPACITY);
    this.localCache = CacheBuilder.newBuilder().softValues().initialCapacity(DEFAULT_INITIAL_CAPACITY)
        .maximumSize(MAX_EXTRY_SIZE).expireAfterWrite(MAX_ENTRY_EXPIRE, TimeUnit.MINUTES).recordStats()
        .build(new CacheLoader<String, Set<String>>() {
          @Override
          public Set<String> load(String key) throws Exception {
            // 缓存加载逻辑
            return source.getUsersByRoom(key);
          }
        });
    scheduledThreadPool = Executors.newScheduledThreadPool(1);
    scheduledThreadPool.scheduleWithFixedDelay(clearEmptyRoomTask(), 1, 1, TimeUnit.MINUTES);
  }

  @PreDestroy
  private void preDestory() {
    scheduledThreadPool.shutdownNow();
    this.clear();
  }

  @Override
  public void create(String roomId) {
    ReadWriteLock rwLocker = locks.computeIfAbsent(roomId, k -> new ReentrantReadWriteLock());

    rwLocker.readLock().lock();
    try {
      if (Objects.isNull(localCache.getIfPresent(roomId))) {
        rwLocker.readLock().unlock();
        rwLocker.writeLock().lock();
        try {
          if (Objects.isNull(localCache.getIfPresent(roomId))) {
            localCache.put(roomId, new HashSet<>());
          }
          rwLocker.readLock().lock(); // 锁降级
        } finally {
          rwLocker.writeLock().unlock();
        }
      }
    } finally {
      rwLocker.readLock().unlock();
    }
  }

  @Override
  public String[] add(String roomId, String element) {
    String[] members = EMPTY_ARRAYS;
    ReadWriteLock rwlocker = getLockByRoomId(roomId);
    if (Objects.nonNull(rwlocker)) {
      rwlocker.writeLock().lock();
      try {
        Set<String> es = localCache.getIfPresent(roomId);
        if (Objects.isNull(es)) {
          es = localCache.getUnchecked(roomId);
        }
        es.add(element);
        members = es.toArray(new String[es.size()]);
      } finally {
        rwlocker.writeLock().unlock();
      }
    }
    return members;
  }

  @Override
  public void remove(String roomId, String element) {
    ReadWriteLock rwlocker = getLockByRoomId(roomId);
    if (Objects.nonNull(rwlocker)) {
      rwlocker.writeLock().lock();
      try {
        Set<String> es = localCache.getIfPresent(roomId);
        if (Objects.isNull(es)) {
          es = localCache.getUnchecked(roomId);
        }
        es.remove(element);
      } finally {
        rwlocker.writeLock().unlock();
      }
    }
  }

  @Override
  public String[] get(String roomId) {
    String[] members = EMPTY_ARRAYS;
    ReadWriteLock rwlocker = getLockByRoomId(roomId);
    if (Objects.nonNull(rwlocker)) {
      rwlocker.readLock().lock();
      try {
        Set<String> es = localCache.getIfPresent(roomId);
        if (Objects.isNull(es)) {// 这地方的处理，主要是因为weak map的存在，有可能本地的缓存Entry被回收，此时重新初始化缓存Entry就好了
          rwlocker.readLock().unlock();
          rwlocker.writeLock().lock();
          try {
            es = localCache.getUnchecked(roomId);
            rwlocker.readLock().lock();
          } finally {
            rwlocker.writeLock().unlock();
          }
        }
        members = es.toArray(new String[es.size()]);
      } finally {
        rwlocker.readLock().unlock();
      }
    }
    return members;
  }

  @Override
  public void clear() {
    locks.keySet().forEach(this::clear);
  }

  @Override
  public void clear(String roomId) {
    ReadWriteLock rwlocker = locks.remove(roomId);
    if (Objects.nonNull(rwlocker)) {
      rwlocker.writeLock().lock();
      try {
        localCache.invalidate(roomId);
      } finally {
        rwlocker.writeLock().unlock();
      }
    }
  }

  private ReadWriteLock getLockByRoomId(String roomId) {
    return locks.computeIfAbsent(roomId, k -> {
      ReadWriteLock lock = null;
      if (source.exists(roomId)) {
        lock = new ReentrantReadWriteLock();
      }
      return lock;
    });
  }

  /**
   * 为本机缓存不能及时清理的情况，添加一个定时任务校正, 虽然正常情况下，不应该会发生
   * 
   * @return
   */
  private Runnable clearEmptyRoomTask() {
    return () -> {
      try {
        locks.keySet().stream().filter(r -> !source.exists(r)).forEach(this::clear);
      } catch (Exception ex) {
        log.error("房间缓存定时清除失败", ex);
      }
    };
  }

  @Override
  public Object getStatistics() {
    return this.localCache.stats();
  }
}
